// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include <boost/optional/optional.hpp> // IWYU pragma: keep
#include <boost/optional/optional_io.hpp> // IWYU pragma: keep
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>

#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/key_value_test_schema.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DEFINE_int32(keyspace_size, 2,  "number of distinct primary keys to test with");
DECLARE_bool(enable_maintenance_manager);
DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
DECLARE_bool(use_hybrid_clock);

using boost::optional;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduDelete;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::client::KuduUpdate;
using kudu::client::KuduValue;
using kudu::client::KuduWriteOperation;
using kudu::client::sp::shared_ptr;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using std::list;
using std::map;
using std::string;
using std::vector;
using std::unique_ptr;
using strings::Substitute;

namespace kudu {
namespace tablet {

// The type of operation in a sequence of operations generated by
// the fuzz test.
enum TestOpType {
  TEST_INSERT,
  TEST_INSERT_PK_ONLY,
  TEST_INSERT_IGNORE,
  TEST_INSERT_IGNORE_PK_ONLY,
  TEST_UPSERT,
  TEST_UPSERT_PK_ONLY,
  TEST_UPDATE,
  TEST_DELETE,
  TEST_FLUSH_OPS,
  TEST_FLUSH_TABLET,
  TEST_FLUSH_DELTAS,
  TEST_MINOR_COMPACT_DELTAS,
  TEST_MAJOR_COMPACT_DELTAS,
  TEST_COMPACT_TABLET,
  TEST_RESTART_TS,
  TEST_SCAN_AT_TIMESTAMP,
  TEST_DIFF_SCAN,
  TEST_NUM_OP_TYPES // max value for enum
};

const char* kTableName = "table";
const char* TestOpType_names[] = {
  "TEST_INSERT",
  "TEST_INSERT_PK_ONLY",
  "TEST_INSERT_IGNORE",
  "TEST_INSERT_IGNORE_PK_ONLY",
  "TEST_UPSERT",
  "TEST_UPSERT_PK_ONLY",
  "TEST_UPDATE",
  "TEST_DELETE",
  "TEST_FLUSH_OPS",
  "TEST_FLUSH_TABLET",
  "TEST_FLUSH_DELTAS",
  "TEST_MINOR_COMPACT_DELTAS",
  "TEST_MAJOR_COMPACT_DELTAS",
  "TEST_COMPACT_TABLET",
  "TEST_RESTART_TS",
  "TEST_SCAN_AT_TIMESTAMP",
  "TEST_DIFF_SCAN"
};

// An operation in a fuzz-test sequence.
struct TestOp {
  // NOLINTNEXTLINE(google-explicit-constructor)
  TestOp(TestOpType t, int v1 = 0, int v2 = 0) // NOLINT(runtime/explicit)
      : type(t),
        val(v1),
        val2(v2) {}

  // The op to run.
  TestOpType type;

  // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
  // For SCAN_AT_TIMESTAMP the timestamp of the scan.
  // For DIFF_SCAN the start timestamp of the scan.
  // Otherwise, unused.
  int val;

  // For DIFF_SCAN, the end timestamp of the scan.
  // Otherwise, unused.
  int val2;

  string ToString() const {
    switch (type) {
      case TEST_FLUSH_OPS:
      case TEST_FLUSH_TABLET:
      case TEST_COMPACT_TABLET:
      case TEST_FLUSH_DELTAS:
      case TEST_MAJOR_COMPACT_DELTAS:
      case TEST_MINOR_COMPACT_DELTAS:
      case TEST_RESTART_TS:
        return strings::Substitute("{$0}", TestOpType_names[type]);
      case TEST_INSERT:
      case TEST_INSERT_PK_ONLY:
      case TEST_INSERT_IGNORE:
      case TEST_INSERT_IGNORE_PK_ONLY:
      case TEST_UPSERT:
      case TEST_UPSERT_PK_ONLY:
      case TEST_UPDATE:
      case TEST_DELETE:
      case TEST_SCAN_AT_TIMESTAMP:
        return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
      case TEST_DIFF_SCAN:
        return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
      default:
        LOG(FATAL) << "Invalid op type: " << type;
    }
    __builtin_unreachable();
  }
};

enum RedoType {
  INSERT,
  UPDATE,
  DELETE,
};

struct Redo {
  Redo(RedoType t, int32_t k, optional<int32_t> v = boost::none)
      : rtype(t),
        key(k),
        val(std::move(v)) {}

  string ToString() const {
    if (rtype == DELETE) {
      return strings::Substitute("{DELETE key=$0}", key);
    }
    return strings::Substitute("{$0 key=$1 val=$2}",
                               rtype == INSERT ? "INSERT" : "UPDATE", key,
                               val ? std::to_string(*val) : "NULL");
  }
  RedoType rtype;
  int32_t key;
  optional<int32_t> val;
};

const vector<TestOpType> kAllOps {TEST_INSERT,
                                  TEST_INSERT_PK_ONLY,
                                  TEST_INSERT_IGNORE,
                                  TEST_INSERT_IGNORE_PK_ONLY,
                                  TEST_UPSERT,
                                  TEST_UPSERT_PK_ONLY,
                                  TEST_UPDATE,
                                  TEST_DELETE,
                                  TEST_FLUSH_OPS,
                                  TEST_FLUSH_TABLET,
                                  TEST_FLUSH_DELTAS,
                                  TEST_MINOR_COMPACT_DELTAS,
                                  TEST_MAJOR_COMPACT_DELTAS,
                                  TEST_COMPACT_TABLET,
                                  TEST_RESTART_TS,
                                  TEST_SCAN_AT_TIMESTAMP,
                                  TEST_DIFF_SCAN};

const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                     TEST_INSERT_IGNORE_PK_ONLY,
                                     TEST_UPSERT_PK_ONLY,
                                     TEST_DELETE,
                                     TEST_FLUSH_OPS,
                                     TEST_FLUSH_TABLET,
                                     TEST_FLUSH_DELTAS,
                                     TEST_MINOR_COMPACT_DELTAS,
                                     TEST_MAJOR_COMPACT_DELTAS,
                                     TEST_COMPACT_TABLET,
                                     TEST_RESTART_TS,
                                     TEST_SCAN_AT_TIMESTAMP,
                                     TEST_DIFF_SCAN};

// Test which does only random operations against a tablet, including update and random
// get (ie scans with equal lower and upper bounds).
//
// The test maintains an in-memory copy of the expected state of the tablet, and uses only
// a single thread, so that it's easy to verify that the tablet always matches the expected
// state.
class FuzzTest : public KuduTest {
 public:
  FuzzTest() {
    FLAGS_enable_maintenance_manager = false;
    FLAGS_use_hybrid_clock = false;
    FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true;
  }

  void CreateTabletAndStartClusterWithSchema(const Schema& schema) {
    schema_ =  KuduSchema::FromSchema(schema);
    KuduTest::SetUp();

    InternalMiniClusterOptions opts;
    cluster_.reset(new InternalMiniCluster(env_, opts));
    ASSERT_OK(cluster_->Start());
    CHECK_OK(KuduClientBuilder()
             .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
             .default_admin_operation_timeout(MonoDelta::FromSeconds(60))
             .Build(&client_));
    // Add a table, make sure it reports itself.
    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
    CHECK_OK(table_creator->table_name(kTableName)
             .schema(&schema_)
             .set_range_partition_columns({ "key" })
             .num_replicas(1)
             .Create());

    // Find the replica.
    tablet_replica_ = LookupTabletReplica();

    // Setup session and table.
    session_ = client_->NewSession();
    CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
    session_->SetTimeoutMillis(60 * 1000);
    CHECK_OK(client_->OpenTable(kTableName, &table_));
  }

  void TearDown() override {
    if (tablet_replica_) tablet_replica_.reset();
    if (cluster_) cluster_->Shutdown();
  }

  scoped_refptr<TabletReplica> LookupTabletReplica() {
    vector<scoped_refptr<TabletReplica> > replicas;
    cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas);
    CHECK_EQ(1, replicas.size());
    return replicas[0];
  }

  void RestartTabletServer() {
    tablet_replica_.reset();
    auto ts = cluster_->mini_tablet_server(0);
    if (ts->server()) {
      ts->Shutdown();
      ASSERT_OK(ts->Restart());
    } else {
      ASSERT_OK(ts->Start());
    }
    ASSERT_OK(ts->server()->WaitInited());

    tablet_replica_ = LookupTabletReplica();
  }

  Tablet* tablet() const {
    return tablet_replica_->tablet();
  }

  // Adds an insert for the given key/value pair to 'ops', returning the new contents
  // of the row.
  ExpectedKeyValueRow InsertOrUpsertRow(int key, int val,
                                        optional<ExpectedKeyValueRow> old_row,
                                        TestOpType type) {
    ExpectedKeyValueRow ret;
    unique_ptr<KuduWriteOperation> op;
    if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) {
      op.reset(table_->NewInsert());
    } else if (type == TEST_INSERT_IGNORE || type == TEST_INSERT_IGNORE_PK_ONLY) {
      op.reset(table_->NewInsertIgnore());
    } else {
      op.reset(table_->NewUpsert());
    }
    KuduPartialRow* row = op->mutable_row();
    CHECK_OK(row->SetInt32(0, key));
    ret.key = key;
    switch (type) {
      case TEST_INSERT:
      case TEST_INSERT_IGNORE:
      case TEST_UPSERT: {
        if (val & 1) {
          CHECK_OK(row->SetNull(1));
        } else {
          CHECK_OK(row->SetInt32(1, val));
          ret.val = val;
        }
        if (type == TEST_INSERT_IGNORE && old_row) {
          // insert ignore when the row already exists results in old value
          ret.val = old_row->val;
        }
        break;
      }
      case TEST_INSERT_PK_ONLY:
        break;
      case TEST_INSERT_IGNORE_PK_ONLY:
      case TEST_UPSERT_PK_ONLY: {
        // For "upsert PK only" and "insert ignore PK only", we expect the row
        // to keep its old value if the row existed, or NULL if there was no old row.
        ret.val = old_row ? old_row->val : boost::none;
        break;
      }
      default: LOG(FATAL) << "Invalid test op type: " << TestOpType_names[type];
    }
    CHECK_OK(session_->Apply(op.release()));
    return ret;
  }

  // Adds an update of the given key/value pair to 'ops', returning the new contents
  // of the row.
  ExpectedKeyValueRow MutateRow(int key, uint32_t new_val) {
    ExpectedKeyValueRow ret;
    unique_ptr<KuduUpdate> update(table_->NewUpdate());
    KuduPartialRow* row = update->mutable_row();
    CHECK_OK(row->SetInt32(0, key));
    ret.key = key;
    if (new_val & 1) {
      CHECK_OK(row->SetNull(1));
    } else {
      CHECK_OK(row->SetInt32(1, new_val));
      ret.val = new_val;
    }
    CHECK_OK(session_->Apply(update.release()));
    return ret;
  }

  // Adds a delete of the given row to 'ops', returning boost::none (indicating that
  // the row no longer exists).
  optional<ExpectedKeyValueRow> DeleteRow(int key) {
    unique_ptr<KuduDelete> del(table_->NewDelete());
    KuduPartialRow* row = del->mutable_row();
    CHECK_OK(row->SetInt32(0, key));
    CHECK_OK(session_->Apply(del.release()));
    return boost::none;
  }

  // Random-read the given row, returning its current value.
  // If the row doesn't exist, returns boost::none.
  optional<ExpectedKeyValueRow> GetRow(int key) {
    KuduScanner s(table_.get());
    CHECK_OK(s.AddConjunctPredicate(table_->NewComparisonPredicate(
        "key", KuduPredicate::EQUAL, KuduValue::FromInt(key))));
    CHECK_OK(s.Open());
    while (s.HasMoreRows()) {
      KuduScanBatch batch;
      CHECK_OK(s.NextBatch(&batch));
      for (KuduScanBatch::RowPtr row : batch) {
        ExpectedKeyValueRow ret;
        CHECK_OK(row.GetInt32(0, &ret.key));
        if (schema_.num_columns() > 1 && !row.IsNull(1)) {
          ret.val = 0;
          CHECK_OK(row.GetInt32(1, ret.val.get_ptr()));
        }
        return ret;
      }
    }
    return boost::none;
  }

  // Checks that the rows in 'found' match the state we've stored 'saved_values_' corresponding
  // to 'timestamp'. 'errors' collects the errors found. If 'errors' is not empty it means the
  // check failed.
  void CheckRowsMatchAtTimestamp(int timestamp,
                                 vector<ExpectedKeyValueRow> rows_found,
                                 list<string>* errors) {
    int saved_timestamp = -1;
    auto iter = saved_values_.upper_bound(timestamp);
    if (iter == saved_values_.end()) {
      if (!rows_found.empty()) {
        for (auto& found_row : rows_found) {
          errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
        }
      }
    } else {
      saved_timestamp = iter->first;
      const auto& saved = (*iter).second;
      int found_idx = 0;
      int expected_values_counter = 0;
      for (auto& expected : saved) {
        if (expected) {
          expected_values_counter++;
          ExpectedKeyValueRow expected_val = expected.value();
          if (found_idx >= rows_found.size()) {
            errors->push_back(Substitute("Didn't find expected value: $0",
                                         expected_val.ToString()));
            break;
          }
          ExpectedKeyValueRow found_val = rows_found[found_idx++];
          if (expected_val.key != found_val.key) {
            errors->push_back(Substitute("Mismached key. Expected: $0 Found: $1",
                                         expected_val.ToString(), found_val.ToString()));
            continue;
          }
          if (expected_val.val != found_val.val) {
            errors->push_back(Substitute("Mismached value. Expected: $0 Found: $1",
                                         expected_val.ToString(), found_val.ToString()));
            continue;
          }
        }
      }
      if (rows_found.size() != expected_values_counter) {
        errors->push_back(Substitute("Mismatched size. Expected: $0 rows. Found: $1 rows.",
                                     expected_values_counter, rows_found.size()));
        for (auto& found_row : rows_found) {
          errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
        }
      }
    }
    if (!errors->empty()) {
      errors->push_front(Substitute("Found errors while comparing a snapshot scan at $0 with the "
                                    "values saved at $1. Errors:",
                                    timestamp, saved_timestamp));
    }
  }

  // Fully consume all rows in 'scanner', writing the results to 'rows'.
  //
  // If 'is_deleted' is provided (only in diff scans), will also write out the
  // values of the IS_DELETED virtual column.
  Status ScanAllRows(KuduScanner* scanner, vector<ExpectedKeyValueRow>* rows,
                     vector<bool>* is_deleted) {
    while (scanner->HasMoreRows()) {
      KuduScanBatch batch;
      RETURN_NOT_OK(scanner->NextBatch(&batch));
      for (KuduScanBatch::RowPtr row : batch) {
        ExpectedKeyValueRow ret;
        RETURN_NOT_OK(row.GetInt32(0, &ret.key));
        if (schema_.num_columns() == 2 && !row.IsNull(1)) {
          ret.val = 0;
          RETURN_NOT_OK(row.GetInt32(1, ret.val.get_ptr()));
        }
        if (is_deleted) {
          bool b;
          RETURN_NOT_OK(row.IsDeleted(&b));
          is_deleted->push_back(b);
        }
        rows->emplace_back(std::move(ret));
      }
    }
    return Status::OK();
  }

  // Scan the tablet at 'timestamp' and compare the result to the saved values.
  void CheckScanAtTimestamp(int timestamp) {
    KuduScanner s(table_.get());
    ASSERT_OK(s.SetReadMode(KuduScanner::ReadMode::READ_AT_SNAPSHOT));
    ASSERT_OK(s.SetSnapshotRaw(timestamp));
    ASSERT_OK(s.SetFaultTolerant());
    ASSERT_OK(s.Open());

    vector<ExpectedKeyValueRow> found;
    ASSERT_OK(ScanAllRows(&s, &found, nullptr));

    list<string> errors;
    CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors);

    string final_error;
    if (!errors.empty()) {
      for (const string& error : errors) {
        final_error.append("\n" + error);
      }
      FAIL() << final_error;
    }
  }

  // Diff scan the tablet from 'start_timestamp' to 'end_timestamp' and compare
  // the result to the saved values.
  void CheckDiffScan(int start_timestamp, int end_timestamp) {
    KuduScanner s(table_.get());
    ASSERT_OK(s.SetDiffScan(start_timestamp, end_timestamp));
    ASSERT_OK(s.Open());

    vector<ExpectedKeyValueRow> found;
    vector<bool> found_is_deleted;
    ASSERT_OK(ScanAllRows(&s, &found, &found_is_deleted));

    if (VLOG_IS_ON(1)) {
      for (int i = 0; i < found.size(); i++) {
        VLOG(1) << Substitute("Diff scan result: $0$1", found[i].ToString(),
                              found_is_deleted[i] ? " (deleted)" : "");
      }
    }

    // Use saved_redos_ to reconstruct the expected results of the diff scan.
    //
    // 'selected_rows' tracks which row keys are expected in the scan results
    // using the select criteria.
    //
    // 'expected_rows' tracks expected values of rows and is built up using the
    // apply criteria. After we've processed all relevant deltas, rows not in
    // 'selected_rows' will be pruned and the results compared with the diff scan.
    //
    // 'is_deleted_start' and 'is_deleted_end' track liveness for each row at
    // the beginning and end of the time range. If a row is dead in both, it
    // shouldn't be in the diff scan results.
    vector<bool> selected_rows(FLAGS_keyspace_size);
    vector<ExpectedKeyValueRow> expected_rows(FLAGS_keyspace_size);
    vector<bool> is_deleted_start(FLAGS_keyspace_size, true);
    vector<bool> is_deleted_end(FLAGS_keyspace_size, true);

    for (const auto& e : saved_redos_) {
      int ts = e.first;
      const auto& redos = e.second;
      if (redos.empty()) {
        continue;
      }
      VLOG(1) << "Processing redos for ts @" << ts;

      if (ts >= end_timestamp) {
        // The redo is beyond the end of the diff scan as per both the select
        // and apply criteria. We're iterating in ascending timestamp order so
        // this also means all future redos are irrelevant.
        if (VLOG_IS_ON(1)) {
          for (const auto& redo : redos) {
            VLOG(1) << "Skipping redo " << redo.ToString();
          }
          continue;
        }
        break;
      }

      for (const auto& redo : redos) {
        VLOG(1) << "Processing redo " << redo.ToString();
        if (ts >= start_timestamp) {
          // The redo is relevant as per the select criteria.
          VLOG(1) << "Selected row " << redo.key;

          if (!selected_rows[redo.key]) {
            // This is the first relevant redo for this row.
            is_deleted_start[redo.key] = redo.rtype == INSERT;
            selected_rows[redo.key] = true;
          }
        }

        // The redo is relevant as per the apply criteria.
        is_deleted_end[redo.key] = redo.rtype == DELETE;
        if (redo.rtype != DELETE) {
          // Deleted rows still exist in 'expected_rows'. This is OK;
          // 'expected_is_deleted' will reflect the deletion.
          expected_rows[redo.key] = { redo.key, redo.val };
        }
        VLOG(1) << "New value for row " << redo.key << ": "
                << expected_rows[redo.key].ToString();
        VLOG(1) << "New is_deleted for row " << redo.key << ": "
                << is_deleted_end[redo.key];
      }
    }
    vector<bool> expected_is_deleted = is_deleted_end;

    // Trim the expected results as per 'selected_rows' and start/end liveness.
    int row_key = 0;
    expected_rows.erase(std::remove_if(
        expected_rows.begin(), expected_rows.end(),
        [&](const ExpectedKeyValueRow& /*row*/) {
          bool retval = !selected_rows[row_key] ||
                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
          row_key++;
          return retval;
        }), expected_rows.end());
    row_key = 0;
    expected_is_deleted.erase(std::remove_if(
        expected_is_deleted.begin(), expected_is_deleted.end(),
        [&](bool /*is_deleted*/) {
          bool retval = !selected_rows[row_key] ||
                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
          row_key++;
          return retval;
        }), expected_is_deleted.end());

    // Test the results. Note that for deleted rows, we can't compare column
    // values; the server is free to pick whatever historical values it wants.
    auto fail_diff_scan = [&]() {
      FAIL() << "Diff scan verification failed\n"
             << "Expected IS_DELETED: " << expected_is_deleted << "\n"
             << "Found IS_DELETED: " << found_is_deleted << "\n"
             << "Expected rows: " << expected_rows  << "\n"
             << "Found rows: " << found;
    };
    if (expected_is_deleted != found_is_deleted) {
      NO_FATALS(fail_diff_scan());
    }
    if (expected_rows.size() != found.size()) {
      NO_FATALS(fail_diff_scan());
    }
    for (int i = 0; i < expected_rows.size(); i++) {
      if ((expected_is_deleted[i] && expected_rows[i].key != found[i].key) ||
          (!expected_is_deleted[i] && expected_rows[i] != found[i])) {
        NO_FATALS(fail_diff_scan());
      }
    }
  }

 protected:
  // Validate that the given sequence is valid and would not cause any
  // errors assuming that there are no bugs. For example, checks to make sure there
  // aren't duplicate inserts with no intervening deletions.
  //
  // Useful when using the 'delta' test case reduction tool to allow
  // it to skip invalid test cases.
  void ValidateFuzzCase(const vector<TestOp>& test_ops);
  void RunFuzzCase(const vector<TestOp>& test_ops,
                   int update_multiplier);

  KuduSchema schema_;
  unique_ptr<InternalMiniCluster> cluster_;
  shared_ptr<KuduClient> client_;
  shared_ptr<KuduSession> session_;
  shared_ptr<KuduTable> table_;

  map<int,
      vector<optional<ExpectedKeyValueRow>>,
      std::greater<int>> saved_values_;

  map<int, vector<Redo>> saved_redos_;

  scoped_refptr<TabletReplica> tablet_replica_;
};

// The set of ops to draw from.
enum TestOpSets {
  ALL, // Pick an operation at random from all possible operations.
  PK_ONLY // Pick an operation at random from the set of operations that apply only to the
          // primary key (or that are now row-specific, like flushes or compactions).
};

TestOpType PickOpAtRandom(TestOpSets sets) {
  switch (sets) {
    case ALL:
      return kAllOps[rand() % kAllOps.size()];
    case PK_ONLY:
      return kPkOnlyOps[rand() % kPkOnlyOps.size()];
    default:
      LOG(FATAL) << "Unknown TestOpSets type: " << sets;
  }
  __builtin_unreachable();
}

bool IsMutation(const TestOpType& op) {
  switch (op) {
    case TEST_INSERT:
    case TEST_INSERT_PK_ONLY:
    case TEST_INSERT_IGNORE:
    case TEST_INSERT_IGNORE_PK_ONLY:
    case TEST_UPSERT:
    case TEST_UPSERT_PK_ONLY:
    case TEST_UPDATE:
    case TEST_DELETE:
      return true;
    default:
      return false;
  }
}

// Generate a random valid sequence of operations for use as a
// fuzz test.
void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
  vector<bool> exists(FLAGS_keyspace_size);
  int op_timestamps = 0;
  bool ops_pending = false;
  bool data_in_mrs = false;
  bool worth_compacting = false;
  bool data_in_dms = false;
  ops->clear();
  while (ops->size() < len) {
    TestOpType r = PickOpAtRandom(sets);
    int row_key = rand() % FLAGS_keyspace_size;

    // When we perform a test mutation, we also call GetRow() which does a scan
    // and thus increases the server's timestamp.
    if (IsMutation(r)) {
      op_timestamps++;
    }

    switch (r) {
      case TEST_INSERT:
      case TEST_INSERT_PK_ONLY:
        if (exists[row_key]) continue;
        ops->emplace_back(r, row_key);
        exists[row_key] = true;
        ops_pending = true;
        data_in_mrs = true;
        break;
      case TEST_INSERT_IGNORE:
      case TEST_INSERT_IGNORE_PK_ONLY:
        ops->emplace_back(r, row_key);
        ops_pending = true;
        // If the row doesn't currently exist, this will act like an insert
        // and put it into MRS.
        if (!exists[row_key]) {
          data_in_mrs = true;
        }
        exists[row_key] = true;
        break;
      case TEST_UPSERT:
      case TEST_UPSERT_PK_ONLY:
        ops->emplace_back(r, row_key);
        ops_pending = true;
        // If the row doesn't currently exist, this will act like an insert
        // and put it into MRS.
        if (!exists[row_key]) {
          data_in_mrs = true;
        } else if (!data_in_mrs) {
          // If it does exist, but not in MRS, then this will put data into
          // a DMS.
          data_in_dms = true;
        }
        exists[row_key] = true;
        break;
      case TEST_UPDATE:
        if (!exists[row_key]) continue;
        ops->emplace_back(TEST_UPDATE, row_key);
        ops_pending = true;
        if (!data_in_mrs) {
          data_in_dms = true;
        }
        break;
      case TEST_DELETE:
        if (!exists[row_key]) continue;
        ops->emplace_back(TEST_DELETE, row_key);
        ops_pending = true;
        exists[row_key] = false;
        if (!data_in_mrs) {
          data_in_dms = true;
        }
        break;
      case TEST_FLUSH_OPS:
        if (ops_pending) {
          ops->emplace_back(TEST_FLUSH_OPS);
          ops_pending = false;
          op_timestamps++;
        }
        break;
      case TEST_FLUSH_TABLET:
        if (data_in_mrs) {
          if (ops_pending) {
            ops->emplace_back(TEST_FLUSH_OPS);
            ops_pending = false;
          }
          ops->emplace_back(TEST_FLUSH_TABLET);
          data_in_mrs = false;
          worth_compacting = true;
        }
        break;
      case TEST_COMPACT_TABLET:
        if (worth_compacting) {
          if (ops_pending) {
            ops->emplace_back(TEST_FLUSH_OPS);
            ops_pending = false;
          }
          ops->emplace_back(TEST_COMPACT_TABLET);
          worth_compacting = false;
        }
        break;
      case TEST_FLUSH_DELTAS:
        if (data_in_dms) {
          if (ops_pending) {
            ops->emplace_back(TEST_FLUSH_OPS);
            ops_pending = false;
          }
          ops->emplace_back(TEST_FLUSH_DELTAS);
          data_in_dms = false;
        }
        break;
      case TEST_MAJOR_COMPACT_DELTAS:
        ops->emplace_back(TEST_MAJOR_COMPACT_DELTAS);
        break;
      case TEST_MINOR_COMPACT_DELTAS:
        ops->emplace_back(TEST_MINOR_COMPACT_DELTAS);
        break;
      case TEST_RESTART_TS:
        ops->emplace_back(TEST_RESTART_TS);
        break;
      case TEST_SCAN_AT_TIMESTAMP: {
        int timestamp = 1;
        if (op_timestamps > 0) {
          timestamp = (rand() % op_timestamps) + 1;
        }
        ops->emplace_back(TEST_SCAN_AT_TIMESTAMP, timestamp);
        break;
      }
      case TEST_DIFF_SCAN: {
        int start_timestamp = 1;
        int end_timestamp = 1;
        if (op_timestamps > 0) {
          start_timestamp = (rand() % op_timestamps) + 1;
          end_timestamp = (rand() % op_timestamps) + 1;
          if (start_timestamp > end_timestamp) {
            std::swap(start_timestamp, end_timestamp);
          }
        }
        ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
        break;
      }
      default:
        LOG(FATAL) << "Invalid op type: " << r;
    }
  }
}

string DumpTestCase(const vector<TestOp>& ops) {
  vector<string> strs;
  for (TestOp test_op : ops) {
    strs.push_back(test_op.ToString());
  }
  return JoinStrings(strs, ",\n");
}

void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
  vector<bool> exists(FLAGS_keyspace_size);
  for (const auto& test_op : test_ops) {
    switch (test_op.type) {
      case TEST_INSERT:
      case TEST_INSERT_PK_ONLY:
        CHECK(!exists[test_op.val]) << "invalid case: inserting already-existing row";
        exists[test_op.val] = true;
        break;
      case TEST_INSERT_IGNORE:
      case TEST_INSERT_IGNORE_PK_ONLY:
      case TEST_UPSERT:
      case TEST_UPSERT_PK_ONLY:
        exists[test_op.val] = true;
        break;
      case TEST_UPDATE:
        CHECK(exists[test_op.val]) << "invalid case: updating non-existing row";
        break;
      case TEST_DELETE:
        CHECK(exists[test_op.val]) << "invalid case: deleting non-existing row";
        exists[test_op.val] = false;
        break;
      default:
        break;
    }
  }
}

void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
                           int update_multiplier = 1) {
  ValidateFuzzCase(test_ops);
  // Dump the test case, since we usually run a random one.
  // This dump format is easy for a developer to copy-paste back
  // into a test method in order to reproduce a failure.
  LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);

  vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
  vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
  vector<Redo> pending_redos;

  int i = 0;
  for (const TestOp& test_op : test_ops) {
    if (IsMutation(test_op.type)) {
      EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val));
    }

    LOG(INFO) << test_op.ToString();
    switch (test_op.type) {
      case TEST_INSERT:
      case TEST_INSERT_PK_ONLY:
      case TEST_INSERT_IGNORE:
      case TEST_INSERT_IGNORE_PK_ONLY:
      case TEST_UPSERT:
      case TEST_UPSERT_PK_ONLY: {
        RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
        pending_val[test_op.val] = InsertOrUpsertRow(
            test_op.val, i++, pending_val[test_op.val], test_op.type);

        // An insert ignore on a row that already exists will be dropped server-side.
        // We must do the same.
        if ((test_op.type == TEST_INSERT_IGNORE || test_op.type == TEST_INSERT_IGNORE_PK_ONLY) &&
            rtype == UPDATE) {
          break;
        }

        // An "upsert PK-only" that is converted into an update will be dropped server-side.
        // We must do the same.
        if (test_op.type == TEST_UPSERT_PK_ONLY && rtype == UPDATE) {
          break;
        }

        pending_redos.emplace_back(rtype, test_op.val, pending_val[test_op.val]->val);
        break;
      }
      case TEST_UPDATE:
        for (int j = 0; j < update_multiplier; j++) {
          pending_val[test_op.val] = MutateRow(test_op.val, i++);
          pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val);
        }
        break;
      case TEST_DELETE:
        pending_val[test_op.val] = DeleteRow(test_op.val);
        pending_redos.emplace_back(DELETE, test_op.val, boost::none);
        break;
      case TEST_FLUSH_OPS: {
        FlushSessionOrDie(session_);
        cur_val = pending_val;
        int current_time = down_cast<kudu::clock::LogicalClock*>(
            tablet()->clock())->GetCurrentTime();
        VLOG(1) << "Current time: " << current_time;
        saved_values_[current_time] = cur_val;
        saved_redos_[current_time] = pending_redos;
        pending_redos.clear();
        break;
      }
      case TEST_FLUSH_TABLET:
        ASSERT_OK(tablet()->Flush());
        break;
      case TEST_FLUSH_DELTAS:
        ASSERT_OK(tablet()->FlushBiggestDMS());
        break;
      case TEST_MAJOR_COMPACT_DELTAS:
        ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
        break;
      case TEST_MINOR_COMPACT_DELTAS:
        ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
        break;
      case TEST_COMPACT_TABLET:
        ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
        break;
      case TEST_RESTART_TS:
        NO_FATALS(RestartTabletServer());
        break;
      case TEST_SCAN_AT_TIMESTAMP:
        NO_FATALS(CheckScanAtTimestamp(test_op.val));
        break;
      case TEST_DIFF_SCAN:
        NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
        break;
      default:
        LOG(FATAL) << test_op.type;
    }
  }
}

// Generates a random test sequence and runs it.
// The logs of this test are designed to easily be copy-pasted and create
// more specific test cases like TestFuzz<N> below.
TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
  CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
  SeedRandom();
  vector<TestOp> test_ops;
  GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50, TestOpSets::PK_ONLY);
  RunFuzzCase(test_ops);
}

// Generates a random test sequence and runs it.
// The logs of this test are designed to easily be copy-pasted and create
// more specific test cases like TestFuzz<N> below.
TEST_F(FuzzTest, TestRandomFuzz) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  SeedRandom();
  vector<TestOp> test_ops;
  GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50);
  RunFuzzCase(test_ops);
}

// Generates a random test case, but the UPDATEs are all repeated many times.
// This results in very large batches which are likely to span multiple delta blocks
// when flushed.
TEST_F(FuzzTest, TestRandomFuzzHugeBatches) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  SeedRandom();
  vector<TestOp> test_ops;
  GenerateTestCase(&test_ops, AllowSlowTests() ? 500 : 50);
  int update_multiplier;
#ifdef THREAD_SANITIZER
  // TSAN builds run more slowly, so 500 can cause timeouts.
  update_multiplier = 100;
#else
  update_multiplier = 500;
#endif
  RunFuzzCase(test_ops, update_multiplier);
}

TEST_F(FuzzTest, TestFuzz1) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  vector<TestOp> test_ops = {
      // Get an inserted row in a DRS.
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},

      // DELETE in DMS, INSERT in MRS and flush again.
      {TEST_DELETE, 0},
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},

      // State:
      // RowSet RowSet(0):
      //   (int32 key=1, int32 val=NULL) Undos: [@1(DELETE)] Redos (in DMS): [@2 DELETE]
      // RowSet RowSet(1):
      //   (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: []

      {TEST_COMPACT_TABLET},
  };
  RunFuzzCase(test_ops);
}

// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz2) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  vector<TestOp> test_ops = {
    {TEST_INSERT, 0},
    {TEST_DELETE, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    // (int32 key=1, int32 val=NULL)
    // Undo Mutations: [@1(DELETE)]
    // Redo Mutations: [@1(DELETE)]

    {TEST_INSERT, 0},
    {TEST_DELETE, 0},
    {TEST_INSERT, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    // (int32 key=1, int32 val=NULL)
    // Undo Mutations: [@2(DELETE)]
    // Redo Mutations: []

    {TEST_COMPACT_TABLET},
    // Output Row: (int32 key=1, int32 val=NULL)
    // Undo Mutations: [@1(DELETE)]
    // Redo Mutations: [@1(DELETE)]

    {TEST_DELETE, 0},
    {TEST_FLUSH_OPS},
    {TEST_COMPACT_TABLET},
  };
  RunFuzzCase(test_ops);
}

// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz3) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  vector<TestOp> test_ops = {
    {TEST_INSERT, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    // Output Row: (int32 key=1, int32 val=NULL)
    // Undo Mutations: [@1(DELETE)]
    // Redo Mutations: []

    {TEST_DELETE, 0},
    // Adds a @2 DELETE to DMS for above row.

    {TEST_INSERT, 0},
    {TEST_DELETE, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    // (int32 key=1, int32 val=NULL)
    // Undo Mutations: [@2(DELETE)]
    // Redo Mutations: [@2(DELETE)]

    // Compaction input:
    // Row 1: (int32 key=1, int32 val=NULL)
    //   Undo Mutations: [@2(DELETE)]
    //   Redo Mutations: [@2(DELETE)]
    // Row 2: (int32 key=1, int32 val=NULL)
    //  Undo Mutations: [@1(DELETE)]
    //  Redo Mutations: [@2(DELETE)]

    {TEST_COMPACT_TABLET},
  };
  RunFuzzCase(test_ops);
}

// A particular test case which previously failed TestFuzz.
TEST_F(FuzzTest, TestFuzz4) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  vector<TestOp> test_ops = {
    {TEST_INSERT, 0},
    {TEST_FLUSH_OPS},
    {TEST_COMPACT_TABLET},
    {TEST_DELETE, 0},
    {TEST_FLUSH_OPS},
    {TEST_COMPACT_TABLET},
    {TEST_INSERT, 0},
    {TEST_UPDATE, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    {TEST_DELETE, 0},
    {TEST_INSERT, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    {TEST_UPDATE, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    {TEST_UPDATE, 0},
    {TEST_DELETE, 0},
    {TEST_INSERT, 0},
    {TEST_DELETE, 0},
    {TEST_FLUSH_OPS},
    {TEST_FLUSH_TABLET},
    {TEST_COMPACT_TABLET},
  };
  RunFuzzCase(test_ops);
}


TEST_F(FuzzTest, TestFuzz5) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  vector<TestOp> test_ops = {
    {TEST_UPSERT_PK_ONLY, 1},
    {TEST_FLUSH_OPS},
    {TEST_INSERT, 0},
    {TEST_SCAN_AT_TIMESTAMP, 5},
  };
  RunFuzzCase(test_ops);
}

// Previously caused incorrect data being read after restart.
// Failure:
//  Value of: val_in_table
//  Actual: "()"
//  Expected: "(" + cur_val + ")"
TEST_F(FuzzTest, TestFuzzWithRestarts1) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 1},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_UPDATE, 1},
      {TEST_RESTART_TS},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_DELTAS},
      {TEST_INSERT, 0},
      {TEST_DELETE, 1},
      {TEST_INSERT, 1},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_RESTART_TS},
      {TEST_MINOR_COMPACT_DELTAS},
      {TEST_COMPACT_TABLET},
      {TEST_UPDATE, 1},
      {TEST_FLUSH_OPS}
    });
}

// Previously caused KUDU-1341:
// deltafile.cc:134] Check failed: last_key_.CompareTo<UNDO>(key) <= 0 must
// insert undo deltas in sorted order (ascending key, then descending ts):
// got key (row 1@ts5965182714017464320) after (row 1@ts5965182713875046400)
TEST_F(FuzzTest, TestFuzzWithRestarts2) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_DELETE, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_DELTAS},
      {TEST_RESTART_TS},
      {TEST_INSERT, 1},
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_DELETE, 0},
      {TEST_INSERT, 0},
      {TEST_UPDATE, 1},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_FLUSH_DELTAS},
      {TEST_RESTART_TS},
      {TEST_UPDATE, 1},
      {TEST_DELETE, 1},
      {TEST_FLUSH_OPS},
      {TEST_RESTART_TS},
      {TEST_INSERT, 1},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_RESTART_TS},
      {TEST_COMPACT_TABLET}
    });
}

// Regression test for KUDU-1467: a sequence involving
// UPSERT which failed to replay properly upon bootstrap.
TEST_F(FuzzTest, TestUpsertSeq) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 1},
      {TEST_UPSERT, 1},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_UPSERT, 1},
      {TEST_DELETE, 1},
      {TEST_UPSERT, 1},
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_RESTART_TS},
      {TEST_UPDATE, 1},
    });
}

// Regression test for KUDU-1623: updates without primary key
// columns specified can cause crashes and issues at restart.
TEST_F(FuzzTest, TestUpsert_PKOnlyOps) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 1},
      {TEST_FLUSH_OPS},
      {TEST_UPSERT_PK_ONLY, 1},
      {TEST_FLUSH_OPS},
      {TEST_RESTART_TS}
    });
}

// Regression test for KUDU-1905: reinserts to a tablet that has
// only primary keys end up as empty change lists. We were previously
// crashing when a changelist was empty.
TEST_F(FuzzTest, TestUpsert_PKOnlySchema) {
  CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
  RunFuzzCase({
      {TEST_UPSERT_PK_ONLY, 1},
      {TEST_DELETE, 1},
      {TEST_UPSERT_PK_ONLY, 1},
      {TEST_UPSERT_PK_ONLY, 1},
      {TEST_FLUSH_OPS}
     });
}

// MRS test for KUDU-2809: a row that has been inserted and deleted within the
// time range of a diff scan is excluded from the results.
TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanMRS) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_DELETE, 0},
      {TEST_FLUSH_OPS},
      {TEST_DIFF_SCAN, 4, 7}
    });
}

// DRS test for KUDU-2809: a row that has been inserted and deleted within the
// time range of a diff scan is excluded from the results.
TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
  RunFuzzCase({
      {TEST_INSERT, 0},
      {TEST_FLUSH_OPS},
      {TEST_FLUSH_TABLET},
      {TEST_DELETE, 0},
      {TEST_FLUSH_OPS},
      {TEST_DIFF_SCAN, 4, 7}
    });
}

} // namespace tablet
} // namespace kudu

// Has to be defined outside of any namespace.
MAKE_ENUM_LIMITS(kudu::tablet::TestOpType,
                 kudu::tablet::TEST_INSERT,
                 kudu::tablet::TEST_NUM_OP_TYPES);
